Hadoopソースコードリーディング 第16回に参加してきました
Hadoopソースコードリーディング 第16回に参加してきました。今回は1.0がリリースされる目前のApache Sparkがテーマでした。
NTTデータ濱野さんの冒頭の挨拶
- Spark1.0リリースを記念する予定が、されていないw
- 今回はお酒を飲んでグダグダする時間はないw
Apache Sparkのご紹介(前半)
NTTデータ土橋さん
まずは土橋さんからSparkの背景やSpark Summit 2013の振り返り、Sparkの基本についての説明がありました。詳細はスライドを見てもらった方がいいですが、さくっと雰囲気を掴みたい方は以下のメモをご参照下さい。
- 土橋さん
- 6年前からHadoopに関わっている。
- 基本はインフラエンジニア
- Ansible使っている。
- アジェンダ
- Sparkの背景
- Spark Summit 2013振り返り
- Sparkのキホン
- RDD
- スケジューラ
- 前提
- 机上調査+ソースコード調査をもとにしている。
- 動作確認しながらの内容は含まれない
- Scalaは怖くないw
- Sparkとは
- 大規模データの分散処理をオンメモリで実現する
- 都度HDDに書き込まない
- Hadoopとアーキテクチャが異なる。
- UCBerkeleyのResilent distributed datasets(RDD)の論文がおおもと
- 大規模データの分散処理をオンメモリで実現する
- Hadoopで繰り返し処理ではIOコストが課題
- MRジョブごとにHDFSに保存するコスト
- Hadoopの外側とのデータ転送コスト
- Hadoopはパワフルだけど…
- もう少し繰り返し計算を効率よく出来ないか?200段のジョブフロー…
- インタラクティブなドリルダウン分析に使えないか?列指向とかSQL以外。
- Sparkは大量データを次々に変換する処理が得意
- JavaやScalaのコレクション操作のようなメソッドやフレームワークを利用できる。
- Hadoopを置き換えるものではなく、SparkはHadoopの仕組みも利用する。
- 利用例
- 得意・不得意
- Hadoopなどを含みエコシステムで成り立つ
- RDBMS、Hadoop、Spark、Storm
- Spark Summit 2013
- 2013-12-02, 03
- Databricks社CEO
- Sparkの上で全部やれるようになることを目指している
- Hadoopとは共存する
- Hadoopコミュニティの参加者がいた
- Yahoo!の事例が目立った
- Hadoopの処理を一部Sparkに置き換えた
- StormやTezに比べてもGitHubのコミット量が多い
- 対応スタックの図があった。
- 動かし方
- Mesos
- 単体(スタンドアロン)←一番多いらしい
- YARN
- Shark: SQL
- クエリやUDFはHive互換
- Spark Streaming
- MLlib: 機械学習
- 最近、MahoutもSparkに移行すると宣言
Apache Sparkのご紹介(後半)
NTTデータ猿田さん
猿田さんからはもう少しSparkの詳細ということで、RDDやジョブ、スケジューラについて説明がありました。
- 猿田さん
- Hadoop徹底入門/Hadoop Hacks
- 典型的な処理イメージ
- RDDという抽象データセットの変換を繰り返して目的の結果を得る
- 一連の処理を「ジョブ」と呼ぶ
- もう少し複雑な処理イメージ
- 処理イメージ図!
- RDD
- パーティションに分割され、サーバ上で分散配置される
- インメモリで保持される(なるべくディスクに書かない)
- 遅延計算される
- イミュータブル(フォールトトレランス性を保持するため)
- 遅延計算
- 変換関数はその時点では実行されない(計算がスケジューリングされない)
- アクションで実行される(計算がスケジューリングされる)
- フォールトトレランス性を実現するための特性
- 得たいデータが失われていたら前のデータから再生成するというアプローチをとっている。そのため以下の制約・前提条件を持っている。
- RDDはイミュータブル
- 元のデータソースは信頼性が高く、イミュータブルである
- 生きている範囲から復元する。
- 得たいデータが失われていたら前のデータから再生成するというアプローチをとっている。そのため以下の制約・前提条件を持っている。
- RDDの依存関係
- 狭い依存関係(Narrow Dependency)
- 広い依存関係(Wide Dependency)
- 親パーティションが、複数の個パテーションの生成に関わる依存関係
- RDDの変換がクラスタ上でどのように動作するか
- クラスタを構成するノード
- クライアント
- マスタ
- ワーカ
- ノード以外の主要な要素
- ドライバ
- エグゼキュータ
- タスク
- スケジューラ
- 図にまとまっている!後で見なおしたい!!
- クラスタを構成するノード
- タスクの生成、実行までの流れ
- 系譜をステージに分割
- ステージの実行要否を判定
- タスクを生成
- タスクを実行する場所を決定
- タスクの実行順序をスケジューリング
- 系譜をステージに分割
- DAGSchedulerが系譜をステージに分割する
- ステージ分割のステップ
- 最後から辿っていく
- ステージの実行要否を判定する
- 同一のスケジューラで制御されるジョブ間でRDDを共有できる
- 共有するRDDが計算済みであれば流用する。なければ生成する。
- タスクの生成
- ステージ内の最後のRDDパーティション数からタスク数が決定される。
- タスクの実行場所の決定
- プリファードロケーションは入力によって決まる。HDFSの場合はノードの位置が分かるためデータローカリティが保証される。
- タスクの実行順書のスケジューリング
- ステージを構成するタスク群は「タスクセット」としてTaskSchedulerに渡される。
- TaskSchedulerはタスクセット単にで実行順序のスケジューリングを行う。
- 論文紹介
- 元々の論文に考え方が書いてある。
QA
- Q. 系譜毎にスケジューラは作られる?
- A. Spark Contextごとに作られる
休憩中
個人的にわからない点がいくつあったので、土橋さん、猿田さんに質問してきました。
- Q. RDDの処理途中に失敗した場合、その直前から再生成するとの話だっけど、そうするとジョブフロー全体のRDDを保持するからメモリの消費量半端無いのでは?
- A. 処理の流れとしてまず最後の部分から必要なRDDを辿っていく形になっている。そして、手前のRDDが存在すれば利用するし、存在しなければ更に手前のRDDから生成するという流れになっている。RDDのパーティション単位にメモリの上限の設定があり、その範囲でキャッシュ出来る範囲でキャッシュして、溢れたら捨てる仕組みになっている。
- Q. ディスクに書き出す制御とか出来るか?
- A.StorageLevelやcacheの指定、checkpointの指定などが出来る
- Q. アクションで永続化?
- A. アクションによる。ドライバーにArrayを返すだけのアクションももある
- Q. 複数のアクションの結果を統合することも可能?
- A. 可能。アクションはあくまでスケジューリングするだけ。マルチスレッドで複数のアクションをスケジューリングすることもできる。そしてそれぞれのアクションの結果をまとめて処理することが出来る
Spark Internals
@taroleo さん/Treasure Data
@taroleoさんからはSparkのソースを見ながら内部構造についての説明がありました。IntelliJとScalaの布教も兼ねている感じでしたw
-
- 自己紹介
- 数カ月前まで東京大学で働いて、Treasure Dataに入った。
- Spark Code Base Size
- 2012年はVersion0.6で20,000行
- 2014年はbranch-1.0で50,000行
- コアは増えてなくてドキュメントとかだった。
- ScalaなのでJava換算だと10倍ぐらいになると思ったほうがいいかも
- Core Developer
- メインは3, 4人で書かれている。
- IntelliJ Tips
- Scalaを読むなら一択
- RDDのソースを見ながら説明
- Scala Console(REPL)
- brew install scala で入る
- RDDにはScalaのコレクションと同じメソッドが実装されている。
- Scala Basics
- object
- 簡単にSingletonが実装できる
- Packege-privagte scope
- Pattern matching
- Case classes
- Immutable and serializable
- まとめて1つのファイルに書ける
- パターンマッチに使える
- Akka Actorを使ってネットワーク間の通信もローカルと同じように書ける。
- 通信はNettyを使っている。
- object
- Scala Cookbook(http://xerial.org/scala-cookbook/)
- 学生さん向けに書いたもの。
- これでScalaを使うようになってくれると嬉しい
- Components
- ソース見ると実際に処理するか分かる。
- mapやfilterは単にインスタンスを生成しているだけ。
- ソース見ると実際に処理するか分かる。
- RDD
- Resilientは「打たれ強い」という意味
- RDD Iterator
- StorageLevelを指定できる
- Delya Scheduling
- なるべく近いところからデータをreadできるようにタスクを配置する
- ClouserSerializer
- ASM4 libraryを使ってbyte codeを調べて使っているもののみClouserに含めるようにしている!
- 実際には関数呼び出しの先までは見てないので要注意!いつの間にか馬鹿でかいオブジェクトが送られていることがある。
- javap -v ...で確認できる。
- ASM4 libraryを使ってbyte codeを調べて使っているもののみClouserに含めるようにしている!
- Cache/Block Manager
- Cache Managerは何もしていない。実際はBlock manager
- Block Manager
- Serialize/Deserialize
- Store
- ShuffleStoreはvalueだけでなくkeyが増える
- SparkEnv
- ノード間で共有する情報?
- HadoopRDD
- Mesos Scheduler Fine Grained
- Mesosから毎回JVMを渡される。そのためJVMの生成コストがかかる。
- Coarse-grained Mesos Scheduler
- MesosのリソースをSparkが専有するので、それはそれで微妙。
- GoogleのOmegaみたいにクラスタ全体を見る感じになれば
- YARNに期待
- 自己紹介
QA
-
-
- Q. (IntelliJについて)grepとの違いは?
- A. 型情報を元に検索するので精度が高い
- Q. StorageLevelは誰が切り替える?
- A. ジョブの最初のConfigurationで指定
- Q. ループの途中で書き込むとか指定できる?
- A. チェックポイントという機能があって、明示的に指定できる
- 引数にStorageLevelを指定できるので、そこで制御するのでは
- map().filter.cache()とかmap().filter().persist()とかで指定
- cache指定のものはメモリから捨てる実装になっているが、間に合わないとOOMとなる。
- Q. (IntelliJについて)grepとの違いは?
-
感想など
実際に利用することを考えるとハッシュタグでも流れていましたが運用やデバッグがつらそうな印象でした。運用に関してはまだ情報自体が少ないですし、歴史自体も浅いですし。デバッグについては、要はDSLなのでコンパイル後のタスクがどうなるかも含めて把握しないと調査できない印象でした。その点、JavaのMapReduceは書いたままのJavaコードが動くので、それと比べると大変なんだろうなーと。
久しぶりに参加できたHadoopソースコードリーディングでしたが、今回はお金とってもいいと感じるぐらいのクオリティでした!しかも全セッションのスライドまで公開されています!!Sparkは概要しか知らなかったのですが、今回参加したおかげで今後調査する時間をかなり短縮できると思います。Sparkのソースを読むためにIntelliJも使いたくなりましたw主催者、発表者、参加者のみなさま、ありがとうございました!